package com.xxx.flink.transform;

import com.xxx.flink.customsource.CustomSource;
import com.xxx.flink.pojo.Event;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 归约聚合
 */
public class TransformReduceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取数据
        DataStreamSource<Event> sourceStream = env.addSource(new CustomSource());

        // 归约聚合：获取当前最活跃的用户及其访问次数
        // 先统计每一个用户的访问次数，然后按name分组（keyBy），放入二元组Tuple2
        SingleOutputStreamOperator<Tuple2<String, Long>> reduce = sourceStream.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event event) throws Exception {
                return Tuple2.of(event.getName(), 1L);
            }
        }).keyBy(data -> data.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {  // data.f0指的就是name
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);   // 同一个用户的访问次数累加（value1是历史累加结果，value2是新产生的1条记录）
            }
        });

        // 最后放入一个key中进行归约聚合
        SingleOutputStreamOperator<Tuple2<String, Long>> result = reduce.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return value1.f1 > value2.f1 ? value1 : value2;  // 看同一个用户的哪个值大
            }
        });

        result.print();

        env.execute();
    }
}
